package com.dtwave.cfstreaming.filter;

import com.alibaba.fastjson.JSONObject;
import com.dtwave.cfstreaming.filter.function.FlatMapFunction;
import com.dtwave.cfstreaming.filter.function.NotNullFilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

public class StreamDataSplit {
    public static SingleOutputStreamOperator<JSONObject> filter(DataStream<String> kafkaDataStream) {
        //过滤空和错误数据以及除了新增和更新的数据（I & U）
        return kafkaDataStream.flatMap(new FlatMapFunction()).filter(new NotNullFilterFunction());
    }
}
